/**
 * QUANSHI.com Inc.
 * Copyright (c) 2016-2017 All Rights Reserved.
 */
package com.quanshi.scheduler.core.thread;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Arrays;
import java.util.Date;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.quanshi.scheduler.core.base.Ret;
import com.quanshi.scheduler.core.biz.model.HandleCallbackParam;
import com.quanshi.scheduler.core.biz.model.TriggerParam;
import com.quanshi.scheduler.core.executor.IJobExecutor;
import com.quanshi.scheduler.core.handler.IJobHandler;
import com.quanshi.scheduler.core.log.IJobFileAppender;
import com.quanshi.scheduler.core.log.IJobLogger;

/**
 * 任务线程
 * 
 * @author chinaxiang
 * @version 2017年7月7日 下午10:35:37
 */
public class JobThread extends Thread {

    private static Logger                     logger    = LoggerFactory.getLogger(JobThread.class);

    private long                               jobId;
    private IJobHandler                       handler;
    private LinkedBlockingQueue<TriggerParam> triggerQueue;
    // avoid repeat trigger for the same TRIGGER_LOG_ID
    private ConcurrentHashSet<Long>        triggerLogIdSet;

    private boolean                           toStop    = false;
    private String                            stopReason;
    // if running job
    private boolean                           running   = false;
    // idel times
    private int                               idleTimes = 0;

    public JobThread(long jobId, IJobHandler handler) {
        this.jobId = jobId;
        this.handler = handler;
        this.triggerQueue = new LinkedBlockingQueue<TriggerParam>();
        this.triggerLogIdSet = new ConcurrentHashSet<Long>();
    }

    public IJobHandler getHandler() {
        return handler;
    }

    /**
     * new trigger to queue
     *
     * @param triggerParam
     * @return
     */
    public Ret<String> pushTriggerQueue(TriggerParam triggerParam) {
        // avoid repeat
        if (triggerLogIdSet.contains(triggerParam.getLogId())) {
            logger.debug("repeate trigger job, logId:{}", triggerParam.getLogId());
            return new Ret<String>(Ret.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
        }

        triggerLogIdSet.add(triggerParam.getLogId());
        triggerQueue.add(triggerParam);
        return Ret.SUCCESS;
    }

    /**
     * kill job thread
     *
     * @param stopReason
     */
    public void toStop(String stopReason) {
        /**
         * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep)，
         * 在阻塞出抛出InterruptedException异常,但是并不会终止运行的线程本身；
         * 所以需要注意，此处彻底销毁本线程，需要通过共享变量方式；
         */
        this.toStop = true;
        this.stopReason = stopReason;
    }

    /**
     * is running job
     * @return
     */
    public boolean isRunningOrHasQueue() {
        return running || triggerQueue.size() > 0;
    }

    @Override
    public void run() {
        while (!toStop) {
            running = false;
            idleTimes++;
            try {
                // to check toStop signal, we need cycle, so wo can not use queue.take(), instand of poll(timeout)
                TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
                if (triggerParam != null) {
                    running = true;
                    idleTimes = 0;
                    triggerLogIdSet.remove(triggerParam.getLogId());

                    // parse param
                    String[] handlerParams = (triggerParam.getExecutorParams() != null && triggerParam.getExecutorParams().trim().length() > 0)
                        ? (String[]) (Arrays.asList(triggerParam.getExecutorParams().split(",")).toArray()) : null;

                    // handle job
                    Ret<String> executeResult = null;
                    try {
                        // log filename: yyyy-MM-dd/9999.log
                        String logFileName = IJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId());

                        IJobFileAppender.contextHolder.set(logFileName);
                        IJobLogger.log("<br>----------- job execute start -----------<br>----------- Params:" + Arrays.toString(handlerParams));

                        executeResult = handler.execute(handlerParams);
                        if (executeResult == null) {
                            executeResult = Ret.FAIL;
                        }

                        IJobLogger.log("<br>----------- job execute end(finish) -----------<br>----------- Return:" + executeResult);
                    } catch (Exception e) {
                        if (toStop) {
                            IJobLogger.log("<br>----------- JobThread toStop, stopReason:" + stopReason);
                        }

                        StringWriter stringWriter = new StringWriter();
                        e.printStackTrace(new PrintWriter(stringWriter));
                        String errorMsg = stringWriter.toString();
                        executeResult = new Ret<String>(Ret.FAIL_CODE, errorMsg);

                        IJobLogger.log("<br>----------- JobThread Exception:" + errorMsg + "<br>----------- job execute end(error) -----------");
                    }

                    // callback handler info
                    if (!toStop) {
                        // commonm
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), executeResult));
                    } else {
                        // is killed
                        Ret<String> stopResult = new Ret<String>(Ret.FAIL_CODE, stopReason + " [业务运行中，被强制终止]");
                        TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
                    }
                } else {
                    if (idleTimes > 30) {
                        IJobExecutor.removeJobThread(jobId, "excutor idel times over limit.");
                    }
                }
            } catch (Throwable e) {
                if (toStop) {
                    IJobLogger.log("<br>----------- job toStop, stopReason:" + stopReason);
                }

                StringWriter stringWriter = new StringWriter();
                e.printStackTrace(new PrintWriter(stringWriter));
                String errorMsg = stringWriter.toString();
                IJobLogger.log("----------- job JobThread Exception:" + errorMsg);
            }
        }

        // callback trigger request in queue
        while (triggerQueue != null && triggerQueue.size() > 0) {
            TriggerParam triggerParam = triggerQueue.poll();
            if (triggerParam != null) {
                // is killed
                Ret<String> stopResult = new Ret<String>(Ret.FAIL_CODE, stopReason + " [任务尚未执行，在调度队列中被终止]");
                TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
            }
        }

        logger.info(">>>>>>>>>>>> job JobThread stoped, hashCode:{}", Thread.currentThread());
    }
}
